#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2020 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Wrapper that runs a host test. Handles timeout and stopping the emulator."""

from __future__ import print_function

import argparse
import enum
import io
import os
import pathlib
import select
import subprocess
import sys
import time


class TestResult(enum.Enum):
  """An Enum representing the result of running a test."""
  SUCCESS = enum.auto()
  FAIL = enum.auto()
  TIMEOUT = enum.auto()
  UNEXPECTED_TERMINATION = enum.auto()

  @property
  def exit_code(self):
    if self is TestResult.SUCCESS:
      return 0
    return 1

  @property
  def reason(self):
    return {
        TestResult.SUCCESS: 'passed',
        TestResult.FAIL: 'failed',
        TestResult.TIMEOUT: 'timed out',
        TestResult.UNEXPECTED_TERMINATION: 'terminated unexpectedly',
    }[self]


def run_test(path, timeout=10):
  start_time = time.monotonic()
  env = dict(os.environ)
  env['ASAN_OPTIONS'] = 'log_path=stderr'

  proc = subprocess.Popen(
      [path],
      bufsize=0,
      stdin=subprocess.PIPE,
      stdout=subprocess.PIPE,
      env=env)

  # Put the output pipe in non-blocking mode. We will then select(2)
  # on the pipe to know when we have bytes to process.
  os.set_blocking(proc.stdout.fileno(), False)

  try:
    output_buffer = io.BytesIO()
    while True:
      select_timeout = timeout - (time.monotonic() - start_time)
      if select_timeout <= 0:
        return TestResult.TIMEOUT, output_buffer.getvalue()

      readable, _, _ = select.select([proc.stdout], [], [], select_timeout)

      if not readable:
        # Indicates that select(2) timed out.
        return TestResult.TIMEOUT, output_buffer.getvalue()

      output_buffer.write(proc.stdout.read())
      output_log = output_buffer.getvalue()

      if b'Pass!' in output_log:
        return TestResult.SUCCESS, output_log
      if b'Fail!' in output_log:
        return TestResult.FAIL, output_log
      if proc.poll():
        return TestResult.UNEXPECTED_TERMINATION, output_log
  finally:
    # Check if the process has exited. If not, send it a SIGTERM, wait for it
    # to exit, and if it times out, kill the process directly.
    if not proc.poll():
      try:
        proc.terminate()
        proc.wait(timeout)
      except subprocess.TimeoutExpired:
        proc.kill()


def parse_options(argv):
  parser = argparse.ArgumentParser()
  parser.add_argument('-t', '--timeout', type=float, default=60,
                      help='Timeout to kill test after.')
  parser.add_argument('--coverage', action='store_const', const='coverage',
                      default='host', dest='test_target',
                      help='Flag if this is a code coverage test.')
  parser.add_argument('test_name', type=str)
  return parser.parse_args(argv)


def main(argv):
  opts = parse_options(argv)

  # Tests will be located in build/host, unless the --coverage flag was
  # provided, in which case they will be in build/coverage.
  exec_path = pathlib.Path('build', opts.test_target, opts.test_name,
                           f'{opts.test_name}.exe')
  if not exec_path.is_file():
    print(f'No test named {opts.test_name} exists!')
    return 1

  start_time = time.monotonic()
  result, output = run_test(exec_path, timeout=opts.timeout)
  elapsed_time = time.monotonic() - start_time

  print('{} {}! ({:.3f} seconds)'.format(
      opts.test_name, result.reason, elapsed_time),
        file=sys.stderr)

  if result is not TestResult.SUCCESS:
    print('====== Emulator output ======', file=sys.stderr)
    print(output.decode('utf-8'), file=sys.stderr)
    print('=============================', file=sys.stderr)
  return result.exit_code


if __name__ == '__main__':
  sys.exit(main(sys.argv[1:]))
